Concurrency Tips

Eugene Shelestovich, Fitbit

Plan for today

Concurrency is hard

race.png

Threads

pool.jpg

thread_map.png

Unstoppable app

public static void main(String[] args) {
  ExecutorService executor = Executors.newSingleThreadExecutor();
  executor.submit(() ->
    System.out.println("Background thread finished."));
  System.out.println("Main thread finished.");
}

Guava to the rescue

ExecutorService executor = MoreExecutors.getExitingExecutorService(
  (ThreadPoolExecutor) yourExecutor);

that basically does:

executor.setThreadFactory(
  new ThreadFactoryBuilder().setDaemon(true))

Interrupt

while ((line = in.readLine()) != null) {
  try {
    // ...
  } catch (InterruptedException e) {
    e.printStackTrace(); // or ignore
  }
}

Correct way

while (!Thread.currentThread().isInterrupted() && ...) {
  try {
    // ...
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
  }
}

Poison pill

private static final Message POISON_PILL = new Message(-1);

while (true) {
  Message message = queue.take();
  if (POISON_PILL.equals(message)) {
    return;
  }

  // handle message here ...
}

Fixed pool, right ?

ExecutorService executor = Executors.newFixedThreadPool(8);

while (true) {
  executor.execute(() -> {
    LOG.info(Thread.currentThread().getName() + " is working");
    Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);

    if (i.getAndIncrement() % 4 == 0) { // inject 25% errors
      throw new RuntimeException();
    }
  });
}

Wrong!

pool-1-thread-1 is working
pool-1-thread-2 is working
Exception in thread "pool-1-thread-1" java.lang.RuntimeException
  at UnstoppablePool.lambda$main$0(UnstoppablePool.java:27)
pool-1-thread-4 is working
pool-1-thread-5 is working
pool-1-thread-6 is working
pool-1-thread-8 is working
Exception in thread "pool-1-thread-6" java.lang.RuntimeException
  at UnstoppablePool.lambda$main$0(UnstoppablePool.java:27)
pool-1-thread-9 is working
pool-1-thread-10 is working

Try-catch everything

new Runnable() {
  public void run() {
    try {
      // logic goes here
    } catch (Throwable e) {
      // error handling
    }
  }
}

Global exception handler

At least log an error:

Thread.setDefaultUncaughtExceptionHandler(
  new Thread.UncaughtExceptionHandler() {
    public void uncaughtException(Thread t, Throwable e) {
      LOG.error("Thread [" + t + "] died abruptly", e);
    }
  });

or bail out:

Thread.setDefaultUncaughtExceptionHandler(
  UncaughtExceptionHandlers.systemExit());

Unbounded queue

ExecutorService pool = Executors.newSingleThreadExecutor();

while (true) {
  i.getAndIncrement();
  pool.execute(() -> {
    LOG.info("Tasks in queue: " + i.get());
    LOG.info("Slow thread working...");
    Uninterruptibles.sleepUninterruptibly(5L, TimeUnit.SECONDS);
  });
}

Tasks in queue: 10406024
Slow consumer working...
java.lang.OutOfMemoryError: GC overhead limit exceeded
disaster_girl.jpg

gc.png

Max threads ?

many.png

In fact, not so many

eshelestovich:/$ java TooManyThreads
1
2
...
2023
2024
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
  at java.lang.Thread.start0(Native Method)
eshelestovich:/$ sysctl kern.num_taskthreads
kern.num_taskthreads: 2048

Defaults too high

eshelestovich:/$ java -server -XX:+UnlockDiagnosticVMOptions
-XX:+PrintFlagsFinal -XX:+PrintCommandLineFlags
-version | grep ThreadStackSize

intx ThreadStackSize = 1024

Docker/Mesos issues

-XX:ParallelGCThreads=8
-XX:ConcGCThreads=2

Tune your pools

new ThreadPoolExecutor(
  8, 32,
  60L, TimeUnit.SECONDS, // keepAliveTime
  new ArrayBlockingQueue<>(1024),
  new ThreadPoolExecutor.CallerRunsPolicy()); // backpressure

backpressure.png

Optimal threads count

For CPU bound tasks, Brian Goetz recommends:

threads = number of CPUs + 1

For mixed workloads, Subramaniam & Goetz agree on:

threads = number of CPUs * (1 + wait time / service time)

E.g. with 8-cores CPU on a 30% I/O workload:

8 * (1 + 30 / 70) = 11 threads

amdahls_law.jpg

If 95% of the program is parallelizable, the theoretical maximum speedup is only x20

Little’s Law

littles_law.jpg

In order to maintain throughput of 500 req/sec with average latency of 150 ms, we need to allocate 75 parallel workers (threads/processes).

It works the other way around too!

Default names

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode):
"pool-1-thread-32" #42 prio=5
"pool-1-thread-31" #41 prio=5
"pool-1-thread-30" #40 prio=5
"pool-1-thread-29" #39 prio=5
"pool-1-thread-28" #38 prio=5
"pool-1-thread-27" #37 prio=5
"pool-1-thread-26" #36 prio=5
...
Complicates troubleshooting

Heizenberg
ThreadFactory factory = new ThreadFactoryBuilder()
  .setNameFormat("my-kafka-pool-%d").build();
ExecutorService pool = Executors.newFixedThreadPool(4, factory);

Summary

Threads section summary…

Locks

mutex.png

Lock on mutable object

private Object[] items = new Object[] {};

private ArrayCopy add(Object newItem) {
  synchronized (items) {
    Object[] newArray = new Object[items.length + 1];
    ...
    items = newArray; // lock object mutation
  }
  return this;
}
Bug in Tomcat (46990)

Lock on literals

private final String lock = "LOCK";

public void doSomething() {
  synchronized (lock) { ... }
}
private final Integer lock = 42;

public void doSomething() {
  synchronized (lock) { ... }
}
Bug in Jetty

Lock scope

private static SysCtx ctx; // static shared state

public synchronized SysCtx getSysCtx() { // two intrinsic locks
  if (ctx == null) {
    ctx = new SysCtx();
  }
  return ctx;
}
Bug in Android (12015587)

I/O under lock

synchronized (this) {
  Future<User> user = remoteService.getUser(id); // network I/O
  // ...
}
Bug in Log4j (41214). Set timeouts explicitly!

ReentrantLock

synchronized

Lock Coarsening, -XX:+EliminateLocks

public String getNames(StringBuffer sb) {
  sb.append("Alice");
  sb.append("Claire");
  return sb.toString();
}

Lock Elision, -XX:+DoEscapeAnalysis

public String getNames() {
  StringBuffer sb = new StringBuffer();
  sb.append("Alice");
  sb.append(someName);
  sb.append("Barbara");
  return sb.toString();
}

Summary

Serialization hurts scalability.
Context switches hurt performance.
Contended locking causes both. Avoid it!

Tactical Tips

Lock Striping

Lock[] locks = new ReentrantLock[16];

public User getUser(int id) {
  Lock lock = locks[id % 16];
  lock.lock();
  ...
}

Strategic Tips

Alternatives

Static code analysis

idea.png
warning.png

questions.jpg

/